👋 환영합니다! 📢 페이스북에 공유하기
### 기존 코드 활용 방식 PS \03ST\0316PythonAI\pxAgent> python run.py == px agent (YouTube sermon summarizer) == Type 'exit' to quit. You: exit \03ST\0316PythonAI\pxAgent> { #C}

PXAGENT/
├─ agent/
│ ├─ init.py
│ ├─ agent.py 👈 여기!
│ ├─ tools/
│ └─ ...
├─ run.py (또는 pxagent002.py / run_pxagent002.py)

from agent.agent import PxAgent # 절대 경로 임포트 사용


def main():

    print("\n== px agent (YouTube sermon summarizer) ==\nType 'exit' to quit.\n")

    agent = PxAgent()

    while True:

        user = input("You: ").strip()

        if not user or user.lower() in {"exit", "quit"}:

            break

        # agent.run은 이미 tenacity로 감싸져 있어 에러가 나면 재시도함.

        try:

            result = agent.run(user)

            print("\nAssistant:\n" + result.output_text + "\n")

        except Exception as e:

            print(f"\n[ERROR] 에이전트 실행 중 예외 발생: {e}\n")

  
  

if __name__ == "__main__":

    main()

agent/agent.py

# agent/agent.py

from __future__ import annotations

  

from typing import Dict, Any, List, Optional, Tuple

from dataclasses import dataclass, field

from tenacity import retry, stop_after_attempt, wait_exponential_jitter

import re

import os

import smtplib

from email.message import EmailMessage

from mimetypes import guess_type

from datetime import datetime

import openai

  

from .config import settings

from .memory import MemoryStore

from .safety import redact_pii

from .tool_router import run_tool

  
  

# =========================

# 유틸: 바탕화면(Desktop) 경로 탐지

#  - 우선순위:

#    1) Windows API(SHGetFolderPathW) - 실제 시스템 데스크톱

#    2) 레지스트리(Shell Folders)

#    3) 환경변수 기반 후보들(Desktop, 바탕화면, OneDrive/Desktop, OneDrive/바탕화면)

#    4) 마지막 폴백: 홈 디렉토리

# =========================

def _desktop_dir() -> str:

    # 1) Windows API

    try:

        import ctypes

        from ctypes import wintypes

        CSIDL_DESKTOPDIRECTORY = 0x10  # 실제 데스크톱 폴더

        SHGFP_TYPE_CURRENT = 0

        buf = ctypes.create_unicode_buffer(wintypes.MAX_PATH)

        if ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_DESKTOPDIRECTORY, None, SHGFP_TYPE_CURRENT, buf) == 0:

            p = buf.value

            if p and os.path.isdir(p):

                return p

    except Exception:

        pass

  

    # 2) 레지스트리 (Windows만)

    try:

        import winreg  # type: ignore

        with winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders") as key:

            val, _ = winreg.QueryValueEx(key, "Desktop")

            if val and os.path.isdir(val):

                return val

    except Exception:

        pass

  

    # 3) 일반 후보들

    home = os.path.expanduser("~")

    candidates = [

        os.path.join(home, "Desktop"),

        os.path.join(home, "바탕화면"),

        os.path.join(home, "OneDrive", "Desktop"),

        os.path.join(home, "OneDrive", "바탕화면"),

        # Microsoft 365/회사 테넌트 OneDrive 표기 변형도 시도

        os.path.join(home, "OneDrive - Microsoft", "Desktop"),

        os.path.join(home, "OneDrive - Microsoft", "바탕화면"),

    ]

    for p in candidates:

        if os.path.isdir(p):

            return p

  

    # 4) 폴백

    return home

  
  

# =========================

# RTF 유니코드 유틸 (python-docx 없어도 한글 완벽)

# =========================

def _signed16(n: int) -> int:

    n = n & 0xFFFF

    return n - 0x10000 if n >= 0x8000 else n

  

def _rtf_escape_char(c: str) -> str:

    if c == "\\":

        return r"\\"

    if c == "{":

        return r"\{"

    if c == "}":

        return r"\}"

    code = ord(c)

    if 0x20 <= code <= 0x7E:

        return c

    if code > 0xFFFF:

        u = code - 0x10000

        high = 0xD800 + (u >> 10)

        low  = 0xDC00 + (u & 0x3FF)

        return rf"\u{_signed16(high)}?\u{_signed16(low)}?"

    return rf"\u{_signed16(code)}?"

  

def _text_to_rtf_body(text: str) -> str:

    parts = []

    for para in text.split("\n\n"):

# [수정] 각 문단을 다시 줄바꿈(\n) 기준으로 분리

        lines_in_para = []

        for ln in para.splitlines():

            converted = "".join(_rtf_escape_char(c) for c in ln)

            lines_in_para.append(converted)

        # [수정] \line(줄바꿈)으로 줄을 연결하고 \par(문단 끝) 추가

        parts.append(r"\line ".join(lines_in_para) + r"\par ")

    return "\n".join(parts)

  

def _rtf_wrap(body: str) -> str:

    return (

        r"{\rtf1\ansi\deff0"

        r"{\fonttbl{\f0 Arial;}}"

        r"\viewkind4\uc1\pard\f0\fs24 "

        + body +

        r"}"

    )

  

def _safe_filename(stem: str) -> str:

    stem = stem.strip()

    stem = re.sub(r"[^\w\-]+", "_", stem)[:60] or "pxdoc"

    return stem

  
  

# =========================

# 시간 파서

# =========================

_TIME = r"(\d{1,2}):(\d{2}):(\d{2})"

  

def _norm(h: str, m: str, s: str) -> str:

    return f"{int(h):02d}:{int(m):02d}:{int(s):02d}"

  

def _parse_times_freeform(s: str) -> Tuple[Optional[str], Optional[str]]:

    if not s:

        return (None, None)

    s = s.strip()

  

    m = re.search(rf"{_TIME}\s*[-~]\s*{_TIME}", s)

    if m:

        a = _norm(*m.groups()[0:3])

        b = _norm(*m.groups()[3:6])

        return (a, b)

  

    m = re.search(rf"(?:^|[\s])-(?P<h>\d{{1,2}}):(?P<m>\d{{2}}):(?P<s>\d{{2}})", s)

    if m:

        b = _norm(m.group("h"), m.group("m"), m.group("s"))

        return (None, b)

  

    m = re.search(rf"(?:끝|end)\s+{_TIME}", s, flags=re.I)

    if m:

        b = _norm(*m.groups()[0:3])

        return (None, b)

  

    m = re.search(rf"(?:시작|start)\s+{_TIME}", s, flags=re.I)

    if m:

        a = _norm(*m.groups()[0:3])

        return (a, None)

  

    m = re.search(rf"{_TIME}", s)

    if m:

        a = _norm(*m.groups()[0:3])

        return (a, None)

  

    return (None, None)

  

def _validate_time_order(s: Optional[str], e: Optional[str]) -> Tuple[Optional[str], Optional[str], Optional[str]]:

    def to_sec(t: Optional[str]) -> Optional[int]:

        if not t:

            return None

        try:

            hh, mm, ss = map(int, t.split(":"))

        except Exception:

            return None

        if not (0 <= mm < 60 and 0 <= ss < 60 and hh >= 0):

            return None

        return hh * 3600 + mm * 60 + ss

  

    ss, ee = to_sec(s), to_sec(e)

    if s and ss is None:

        return None, None, "시간 형식이 잘못되었습니다(분/초는 00–59여야 합니다)."

    if e and ee is None:

        return None, None, "시간 형식이 잘못되었습니다(분/초는 00–59여야 합니다)."

    if ss is not None and ee is not None and ss >= ee:

        return None, None, "시작 시간이 끝 시간보다 크거나 같습니다."

    return s, e, None

  
  

# =========================

# 생성 옵션

# =========================

@dataclass

class GenOptions:

    sections: List[str] = field(default_factory=lambda: [

        "시작 기도문", "아이스브레이크", "본문 요약", "본문 이해 질문", "적용 질문", "끝나는 기도문"

    ])

    start_prayer_lines: Tuple[int, int] = (8, 10)

    end_prayer_lines:   Tuple[int, int] = (7, 10)

    icebreakers_count:  int = 3

    understanding_q:    int = 7

    application_q:      int = 7

    summary_chars:      Tuple[int, int] = (5000, 7000)

    length_hint:        str = "normal"  # short|normal|long

  
  

@dataclass

class AgentResult:

    output_text: str

    tool_runs: List[Dict[str, Any]]

  
  

class PxAgent:

    """

    - 유튜브 링크(+시간) 전사 → 지정 섹션만 생성(‘기도문만’ 등 옵션 기억)

    - 결과는 즉시 바탕화면(또는 로컬 Desktop)에 RTF/DOC(=RTF) 저장, python-docx 있으면 DOCX도 저장

    - 저장 직후 settings의 SMTP로 이메일 첨부 전송(설정 없으면 생략)

    """

    def __init__(self, memory: Optional[MemoryStore] = None):

        self.memory = memory or MemoryStore()

        self.client = openai.OpenAI(api_key=settings.openai_api_key)

  

        self._yt_url_re = re.compile(r"(https?://(?:www\.)?youtube\.com/\S+|https?://youtu\.be/\S+)", re.I)

        self.pending_url: Optional[str] = None

        self._pending_opts: Optional[GenOptions] = None

  

        self._model = settings.model

        self._max_out = int(getattr(settings, "max_output_tokens", 3072))

        self._mem_sum_tokens = int(getattr(settings, "memory_summary_tokens", 800))

        self._chunk_sec = int(getattr(settings, "youtube_chunk_duration_sec", 600))

        self._overlap_sec = int(getattr(settings, "youtube_overlap_sec", 1))

  

        # python-docx 사용 가능 여부

        try:

            from docx import Document  # noqa: F401

            self._has_docx = True

        except Exception:

            self._has_docx = False

  

    # ----------------- 내부 유틸 -----------------

    def _set_pending_url(self, url: str) -> None:

        self.pending_url = url

  

    def _get_pending_url(self) -> Optional[str]:

        return self.pending_url

  

    def _parse_generation_options(self, text: str) -> GenOptions:

        t = text.lower()

        opt = GenOptions()

  

        want_prayer_only = ("기도문만" in text) or ("끝나는 기도문만" in text) or ("start prayer only" in t)

        if want_prayer_only:

            if "시작" in text or "start" in t:

                opt.sections = ["시작 기도문"]

            elif "끝" in text or "마무리" in text or "end" in t:

                opt.sections = ["끝나는 기도문"]

            else:

                opt.sections = ["끝나는 기도문"]

  

        if ("아이스브레이크만" in text) or ("icebreakers only" in t):

            opt.sections = ["아이스브레이크"]

        if ("요약만" in text) or ("본문 요약만" in text) or ("summary only" in t):

            opt.sections = ["본문 요약"]

        if ("이해 질문만" in text):

            opt.sections = ["본문 이해 질문"]

        if ("적용 질문만" in text):

            opt.sections = ["적용 질문"]

  

        m = re.search(r"아이스브레이크\s*(\d+)", text)

        if m: opt.icebreakers_count = max(1, int(m.group(1)))

        m = re.search(r"(이해\s*질문|본문\s*이해\s*질문)\s*(\d+)", text)

        if m: opt.understanding_q = max(1, int(m.group(2)))

        m = re.search(r"(적용\s*질문)\s*(\d+)", text)

        if m: opt.application_q = max(1, int(m.group(2)))

        m = re.search(r"(기도문|끝나는\s*기도문|마무리\s*기도)\s*(\d+)\s*~\s*(\d+)\s*줄", text)

        if m:

            a, b = int(m.group(2)), int(m.group(3))

            if "끝" in m.group(1) or "마무리" in m.group(1):

                opt.end_prayer_lines = (min(a, b), max(a, b))

            else:

                opt.start_prayer_lines = (min(a, b), max(a, b))

  

        if ("짧게" in text) or ("간단히" in text) or ("short" in t):

            opt.length_hint = "short"; opt.summary_chars = (1500, 2200)

        if ("길게" in text) or ("자세히" in text) or ("long" in t):

            opt.length_hint = "long"; opt.summary_chars = (7000, 9000)

  

        return opt

  

    def _has_explicit_prefs(self, text: str) -> bool:

        t = text.lower()

        markers = [

            "기도문만", "끝나는 기도문만", "아이스브레이크만", "요약만",

            "본문 요약만", "summary only", "icebreakers only",

            "짧게", "간단히", "short", "길게", "자세히", "long",

            "아이스브레이크", "이해 질문", "적용 질문", "기도문", "마무리 기도",

        ]

        if any(m in text for m in markers) or any(m in t for m in markers):

            return True

        if re.search(r"아이스브레이크\s*\d+", text): return True

        if re.search(r"(이해\s*질문|본문\s*이해\s*질문)\s*\d+", text): return True

        if re.search(r"(적용\s*질문)\s*\d+", text): return True

        if re.search(r"(기도문|끝나는\s*기도문|마무리\s*기도)\s*\d+\s*~\s*\d+\s*줄", text): return True

        return False

  

    def _remember_prefs_if_any(self, text: str) -> None:

        if self._has_explicit_prefs(text):

            self._pending_opts = self._parse_generation_options(text)

  

    def _resolve_generation_options(self, text: str) -> GenOptions:

        current = self._parse_generation_options(text)

        if not self._has_explicit_prefs(text) and self._pending_opts:

            use = self._pending_opts

            self._pending_opts = None

            return use

        if self._has_explicit_prefs(text):

            self._pending_opts = None

            return current

        return current

  

    def _filter_to_allowed_sections(self, text: str, allowed: List[str]) -> str:

        all_headers = ["시작 기도문", "아이스브레이크", "본문 요약", "본문 이해 질문", "적용 질문", "끝나는 기도문"]

        forbidden = [h for h in all_headers if h not in allowed]

        cleaned = text

        for h in forbidden:

            pattern = rf"\n?#\s*(?:\d+\)\s*)?{re.escape(h)}[^\n]*\n.*?(?=\n#\s|\Z)"

            cleaned = re.sub(pattern, "", cleaned, flags=re.S | re.I)

        cleaned = re.sub(r"\n{3,}", "\n\n", cleaned).strip()

        return cleaned

  

# ----------------- 파일 내보내기 + 이메일 -----------------

    # [새로 추가되는 메서드] 전사문 원본 저장용

    def _save_raw_transcript(self, text: str, title_hint: str = "raw_transcript") -> Optional[str]:

        """

        전사문 원본을 바탕화면에 .txt 파일로 저장합니다.

        실패 시 None을 반환하지만, 메인 로직을 중단하지는 않습니다.

        """

        try:

            ts = datetime.now().strftime("%Y%m%d_%H%M%S")

            base = f"{_safe_filename(title_hint)}_{ts}"

            desktop = _desktop_dir()

            filepath = os.path.join(desktop, f"{base}.txt")

            # 한글 저장을 위해 encoding="utf-8" 필수

            with open(filepath, "w", encoding="utf-8") as f:

                f.write(text)

            return filepath

        except Exception:

            # 저장에 실패해도 전체 프로세스는 계속 진행되도록 예외를 삼킵니다.

            # 필요하다면 여기에 로깅을 추가할 수 있습니다.

            return None

  

    # ----------------- 파일 내보내기 + 이메일 -----------------

    def _export_files(self, text: str, title_hint: str = "px_sermon") -> Dict[str, str]:

        """

        결과 텍스트를 바탕화면에 저장:

        - 항상 RTF, DOC(=RTF) 저장

        - python-docx가 있으면 DOCX도 저장

        반환: {"rtf": path, "doc": path, "docx": path?}

        """

        ts = datetime.now().strftime("%Y%m%d_%H%M%S")

        base = f"{_safe_filename(title_hint)}_{ts}"

        desktop = _desktop_dir()

  

        # RTF 내용

        body = _text_to_rtf_body(text)

        rtf = _rtf_wrap(body)

  

        out: Dict[str, str] = {}

  

        rtf_path = os.path.join(desktop, f"{base}.rtf")

        with open(rtf_path, "w", encoding="ascii", errors="strict") as f:

            f.write(rtf)

        out["rtf"] = rtf_path

  

        doc_path = os.path.join(desktop, f"{base}.doc")

        with open(doc_path, "w", encoding="ascii", errors="strict") as f:

            f.write(rtf)

        out["doc"] = doc_path

  

        if self._has_docx:

            try:

                from docx import Document

                docx_path = os.path.join(desktop, f"{base}.docx")

                doc = Document()

                for para in text.split("\n\n"):

                    doc.add_paragraph(para)

                doc.save(docx_path)

                out["docx"] = docx_path

            except Exception:

                pass

  

        return out

  

    def _send_email_with_attachments(

        self,

        subject: str,

        body: str,

        to_email: str,

        attachments: List[str],

        smtp_host: str,

        smtp_port: int,

        smtp_user: str,

        smtp_pass: str,

        from_email: Optional[str] = None,

        use_tls: bool = True,

    ) -> Tuple[bool, Optional[str]]:

        msg = EmailMessage()

        msg["Subject"] = subject

        msg["To"] = to_email

        msg["From"] = from_email or smtp_user

        msg.set_content(body)

  

        for path in attachments:

            try:

                ctype, _ = guess_type(path)

                maintype, subtype = (ctype.split("/", 1) if ctype else ("application", "octet-stream"))

                with open(path, "rb") as fp:

                    data = fp.read()

                filename = os.path.basename(path)

                msg.add_attachment(data, maintype=maintype, subtype=subtype, filename=filename)

            except Exception as e:

                return False, f"첨부 실패: {path} ({e})"

  

        try:

            if use_tls:

                with smtplib.SMTP(smtp_host, smtp_port) as s:

                    s.starttls()

                    s.login(smtp_user, smtp_pass)

                    s.send_message(msg)

            else:

                with smtplib.SMTP_SSL(smtp_host, smtp_port) as s:

                    s.login(smtp_user, smtp_pass)

                    s.send_message(msg)

            return True, None

        except Exception as e:

            return False, f"이메일 전송 실패: {e}"

  

    # ----------------- YouTube 워크플로우 -----------------

    def _maybe_youtube_workflow(self, text: str) -> Optional[AgentResult]:

        # 사용자 의도 기억

        self._remember_prefs_if_any(text)

  

        # URL 대기 상태에서 시간만 온 경우

        pending = self._get_pending_url()

        if pending:

            s, e = _parse_times_freeform(text)

            if s or e:

                s, e, err = _validate_time_order(s, e)

                if err:

                    out = f"시간 입력 오류: {err}"

                    self._remember(text, out)

                    return AgentResult(output_text=out, tool_runs=[])

  

                yt = run_tool("youtube_stt", {

                    "url": pending, "lang_hint": "ko",

                    "sermon_start": s, "sermon_end": e,

                    "chunk_duration_sec": self._chunk_sec,

                    "overlap_sec": self._overlap_sec,

                })

  

                tool_runs = [{

                    "name": "youtube_stt",

                    "args": {

                        "url": pending, "sermon_start": s, "sermon_end": e,

                        "chunk_duration_sec": self._chunk_sec, "overlap_sec": self._overlap_sec

                    },

                    "result": {"ok": yt.get("ok", False), "error": yt.get("error")},

                }]

  

                if not yt.get("ok"):

                    out = f"전사 실패: {yt.get('error')}\n시간을 다시 주시거나, 다른 영상 링크를 주세요."

                    self._remember(text, out)

                    return AgentResult(output_text=out, tool_runs=tool_runs)

  

                transcript = yt["result"]["text"]

  

                # ▼▼▼▼▼ [추가] 전사문 원본 저장 호출 ▼▼▼▼▼

                saved_txt_path = self._save_raw_transcript(transcript, title_hint="youtube_raw")

                saved_msg_extra = f"\n(참고: 전사문 원본도 저장되었습니다: {os.path.basename(saved_txt_path)})" if saved_txt_path else ""

                # ▲▲▲▲▲ [추가 끝] ▲▲▲▲▲

                opts = self._resolve_generation_options(text)

                out = self._compose_px_doc(transcript, opts)

                out = self._filter_to_allowed_sections(out, opts.sections)

  

                # === 저장 + 이메일 ===

                files = self._export_files(out, title_hint="sermon_output")

                saved_msg = " \n".join([f"- {k.upper()}: {v}" for k, v in files.items()])

                footer = (

                    "\n\n---\n"

                    "바탕화면에 파일을 저장했습니다:\n" + saved_msg +

                    # ▼▼▼ 아래 줄 추가 (위에서 만든 saved_msg_extra 변수 활용) ▼▼▼

                    saved_msg_extra +

                    "\n\n이 파일들은 설정된 메일로도 전송합니다." # (참고: 원본 txt는 이메일로 보내지 않습니다)

                )

  

                # 이메일 전송

                recipient = getattr(settings, "recipient_email", None)

                smtp_host = getattr(settings, "smtp_host", None)

                smtp_port = int(getattr(settings, "smtp_port", 587))

                smtp_user = getattr(settings, "smtp_user", None)

                smtp_pass = getattr(settings, "smtp_pass", None)

                from_email = getattr(settings, "from_email", None)

                use_tls_flag = (smtp_port != 465)

  

                status_line = ""

                if recipient and smtp_host and smtp_user and smtp_pass:

                    ok, err = self._send_email_with_attachments(

                        subject="[PX Agent] 설교 자료",

                        body="자동 생성된 설교 자료를 첨부합니다.",

                        to_email=recipient,

                        attachments=list(files.values()),

                        smtp_host=smtp_host,

                        smtp_port=smtp_port,

                        smtp_user=smtp_user,

                        smtp_pass=smtp_pass,

                        from_email=from_email,

                        use_tls=use_tls_flag,

                    )

                    status_line = "\n이메일로도 발송했습니다." if ok else f"\n이메일 발송 실패: {err}"

                else:

                    status_line = "\n(참고) settings에 SMTP/recipient_email이 설정되어 있지 않아 이메일은 생략되었습니다."

  

                out = out + footer + status_line

  

                self.pending_url = None

                self._pending_opts = None

                self._remember(text, out)

                return AgentResult(output_text=out, tool_runs=tool_runs)

  

        # 이번 입력에 URL이 포함된 경우

        m = self._yt_url_re.search(text)

        if not m:

            return None

        url = m.group(1)

  

        s, e = _parse_times_freeform(text)

        if s or e:

            s, e, err = _validate_time_order(s, e)

            if err:

                out = f"시간 입력 오류: {err}"

                self._remember(text, out)

                return AgentResult(output_text=out, tool_runs=[])

  

            yt = run_tool("youtube_stt", {

                "url": url, "lang_hint": "ko",

                "sermon_start": s, "sermon_end": e,

                "chunk_duration_sec": self._chunk_sec,

                "overlap_sec": self._overlap_sec,

            })

            tool_runs = [{

                "name": "youtube_stt",

                "args": {

                    "url": url, "sermon_start": s, "sermon_end": e,

                    "chunk_duration_sec": self._chunk_sec, "overlap_sec": self._overlap_sec

                },

                "result": {"ok": yt.get("ok", False), "error": yt.get("error")},

            }]

            if not yt.get("ok"):

                out = f"전사 실패: {yt.get('error')}\n시간을 다시 주시거나, 다른 영상 링크를 주세요."

                self._remember(text, out)

                return AgentResult(output_text=out, tool_runs=tool_runs)

  

            transcript = yt["result"]["text"]

            # ▼▼▼▼▼ [추가] 전사문 원본 저장 호출 ▼▼▼▼▼

            saved_txt_path = self._save_raw_transcript(transcript, title_hint="youtube_raw")

            saved_msg_extra = f"\n(참고: 전사문 원본도 저장되었습니다: {os.path.basename(saved_txt_path)})" if saved_txt_path else ""

            # ▲▲▲▲▲ [추가 끝] ▲▲▲▲▲

            opts = self._resolve_generation_options(text)

            out = self._compose_px_doc(transcript, opts)

            out = self._filter_to_allowed_sections(out, opts.sections)

  

            # === 저장 + 이메일 ===

            files = self._export_files(out, title_hint="sermon_output")

            saved_msg = " \n".join([f"- {k.upper()}: {v}" for k, v in files.items()])

            footer = (

                "\n\n---\n"

                "바탕화면에 파일을 저장했습니다:\n" + saved_msg +

                # ▼▼▼ 아래 줄 추가 (위에서 만든 saved_msg_extra 변수 활용) ▼▼▼

                saved_msg_extra +

                "\n\n이 파일들은 설정된 메일로도 전송합니다." # (참고: 원본 txt는 이메일로 보내지 않습니다)

            )

  

            recipient = getattr(settings, "recipient_email", None)

            smtp_host = getattr(settings, "smtp_host", None)

            smtp_port = int(getattr(settings, "smtp_port", 587))

            smtp_user = getattr(settings, "smtp_user", None)

            smtp_pass = getattr(settings, "smtp_pass", None)

            from_email = getattr(settings, "from_email", None)

            use_tls_flag = (smtp_port != 465)

  

            status_line = ""

            if recipient and smtp_host and smtp_user and smtp_pass:

                ok, err = self._send_email_with_attachments(

                    subject="[PX Agent] 설교 자료",

                    body="자동 생성된 설교 자료를 첨부합니다.",

                    to_email=recipient,

                    attachments=list(files.values()),

                    smtp_host=smtp_host,

                    smtp_port=smtp_port,

                    smtp_user=smtp_user,

                    smtp_pass=smtp_pass,

                    from_email=from_email,

                    use_tls=use_tls_flag,

                )

                status_line = "\n이메일로도 발송했습니다." if ok else f"\n이메일 발송 실패: {err}"

            else:

                status_line = "\n(참고) settings에 SMTP/recipient_email이 설정되어 있지 않아 이메일은 생략되었습니다."

  

            out = out + footer + status_line

  

            self._remember(text, out)

            return AgentResult(output_text=out, tool_runs=tool_runs)

  

        # URL만 온 경우 → 시간 요청

        self._set_pending_url(url)

        ask = (

            "설교 **시작 시간**과 **끝 시간**을 알려주세요.\n"

            "- 형식: HH:MM:SS (예: 00:27:15 01:12:40, 또는 00:27:15-01:12:40)\n"

            "- 시간 1개만 주시면 시작으로 간주하고 **영상 끝**까지 처리합니다.\n"

            "- 끝만 주시려면 '-01:12:40' 또는 '끝 01:12:40'처럼 보내주세요."

        )

        self._remember(text, ask)

        return AgentResult(output_text=ask, tool_runs=[])

  

    # ----------------- 퍼블릭 엔트리 -----------------

    @retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter(1, 3))

    def run(self, user_text: str) -> AgentResult:

        r = self._maybe_youtube_workflow(user_text)

        if r:

            return r

        msg = "예배용 YouTube 링크를 알려주세요. (예: https://youtu.be/XXXXXXXXXXX)"

        self._remember(user_text, msg)

        return AgentResult(output_text=msg, tool_runs=[])

  

    # ----------------- 요약/생성 -----------------

    def _summarize_transcript(self, transcript: str) -> str:

        if not transcript or len(transcript) < 1200:

            return transcript

        sys = (

            "다음 설교 전사문을 7,000~10,000자 정도로 아주 자세한 한국어 개요로 응집하세요. "

            "핵심 논지/흐름/중요 인용만 유지하고 반복은 줄이세요."

        )

        resp = self.client.responses.create(

            model=self._model,

            input=[

                {"role": "system", "content": [{"type": "input_text", "text": sys}]},

                {"role": "user", "content": [{"type": "input_text", "text": transcript}]},

            ],

            max_output_tokens=min(self._max_out, 3500),

        )

        return self._collect_text(resp)

    # ▲▲▲ (참고: 한글 1만 자는 대략 3000~3500토큰 정도 됩니다)

  

    def _compose_px_doc(self, transcript: str, opts: Optional[GenOptions] = None) -> str:

        opts = opts or GenOptions()

        brief = self._summarize_transcript(transcript)

  

        parts = []

        if "시작 기도문" in opts.sections:

            a, b = opts.start_prayer_lines; parts.append(f"1) 시작 기도문 ({a}–{b}줄)")

        if "아이스브레이크" in opts.sections:

            parts.append(f"2) 아이스브레이크 ({opts.icebreakers_count}문항, 누구나 쉽게)")

        if "본문 요약" in opts.sections:

            lo, hi = opts.summary_chars; parts.append(f"3) 본문 요약 (~{lo}–{hi}자, 수필체, 중복 최소화, 흐름 유지, 소제목 3–5 허용)")

        if "본문 이해 질문" in opts.sections:

            parts.append(f"4) 본문 이해 질문 (정확히 {opts.understanding_q}문항)")

        if "적용 질문" in opts.sections:

            parts.append(f"5) 적용 질문 (정확히 {opts.application_q}문항)")

        if "끝나는 기도문" in opts.sections:

            a, b = opts.end_prayer_lines; parts.append(f"6) 끝나는 기도문 ({a}–{b}줄)")

        order_str = "\n".join(parts)

  

        length_hint_msg = {"short":"- 전반적으로 간결하게.\n","normal":"","long":"- 설명과 예시를 덧붙여 풍성하게.\n"}[opts.length_hint]

  

        sys = (

            "너는 'px agent'이며 한국어로 고품질 문서를 생성한다. 오직 예배 설교 전사(Transcript)만을 근거로 작성하라.\n"

            "다음 ‘선택된 섹션’만 출력한다. 각 섹션은 Markdown 헤더(#)로 구분하라.\n"

            f"{order_str}\n"

            "- 인용은 1–2문장 이내로만.\n"

            "- 과도한 상투어를 피하고 실제 예배 톤을 유지하라.\n"

            "- '본문 요약' 섹션은 중간 소제목이나 개조식 나열 없이, 소그룹 인도자가 사람들 앞에서 자연스럽게 낭독할 수 있는 부드러운 구어체 설교문(줄글) 형식으로 연결하여 작성하라.\n"

            "- 각 섹션 헤더는 정확히 위 제목을 포함(예: '# 적용 질문').\n"

            + length_hint_msg

        )

  

        seed = transcript[:4000] if transcript else ""

        user = f"[요약본]\n{brief}\n[원문 일부]\n{seed}"

  

        resp = self.client.responses.create(

            model=self._model,

            input=[

                {"role": "system", "content": [{"type": "input_text", "text": sys}]},

                {"role": "user", "content": [{"type": "input_text", "text": user}]},

            ],

            max_output_tokens=self._max_out,

        )

        text = self._collect_text(resp)

  

        text = self._postfix_fill_custom(text, opts)

        return text

  

    # ----------------- 사후 보강 -----------------

    def _postfix_fill_custom(self, text: str, opts: GenOptions) -> str:

        t = text

  

        def _ensure_n(section: str, n: int, prefix: str, base: str) -> str:

            pattern = rf"(#\s*(?:\d+\)\s*)?{section}.*?)(?=\n#\s|\Z)"

            m = re.search(pattern, base, flags=re.S | re.I)

            if not m: return base

            block = m.group(1)

            items = re.findall(r"(?m)^\s*[-*]\s+.+|^\s*\d+[\.)]\s+.+", block)

            if len(items) >= n: return base

            need = n - len(items)

            adds = [f"- {prefix} #{len(items)+i+1}" for i in range(need)]

            if not block.endswith("\n\n"):

                block = block.rstrip() + "\n\n"

            new_block = block + "\n".join(adds) + "\n"

            return base[:m.start(1)] + new_block + base[m.end(1):]

  

        #if "본문 이해 질문" in opts.sections:

        #    t = _ensure_n("본문 이해 질문", opts.understanding_q, "설교 핵심을 확인하기 위한 보충 질문을 작성해 보세요", t)

        #if "적용 질문" in opts.sections:

        #    t = _ensure_n("적용 질문", opts.application_q, "이번 주에 실제로 적용해 볼 행동을 구체적으로 정해 보세요", t)

        if "끝나는 기도문" in opts.sections:

            t = self._ensure_ending_prayer_length_custom(t, lines_range=opts.end_prayer_lines)

        if "시작 기도문" in opts.sections:

            t = self._ensure_starting_prayer_length_custom(t, lines_range=opts.start_prayer_lines)

        return t

  

    def _ensure_starting_prayer_length_custom(self, text: str, lines_range: Tuple[int, int]) -> str:

        m = re.search(r"(#\s*(?:\d+\)\s*)?시작 기도문[^\n]*\n)(.*?)(?=\n#\s|\Z)", text, flags=re.S | re.I)

        if not m: return text

        header, body = m.group(1), m.group(2)

        lines = [ln for ln in body.splitlines() if ln.strip()]

        lo, hi = lines_range

        if len(lines) >= lo: return text

        sys = (f"아래 '시작 기도문' 본문을 {lo}–{hi}줄로 자연스럽게 보강하세요. 톤은 온화하고 구체적이며 과한 상투어를 피하세요. 헤더는 생성하지 마세요.")

        user = f"[본문]\n{body}\n[/본문]"

        try:

            resp = self.client.responses.create(

                model=self._model,

                input=[

                    {"role": "system", "content": [{"type": "input_text", "text": sys}]},

                    {"role": "user", "content": [{"type": "input_text", "text": user}]},

                ],

                max_output_tokens=min(self._max_out, 600),

            )

            new_body = self._collect_text(resp).strip()

            if not new_body.strip() or "(No text output)" in new_body: return text

            return text[:m.start()] + header + new_body + text[m.end():]

        except Exception:

            return text

  

    def _ensure_ending_prayer_length_custom(self, text: str, lines_range: Tuple[int, int]) -> str:

        m = re.search(r"(#\s*(?:\d+\)\s*)?끝나는 기도문[^\n]*\n)(.*?)(?=\n#\s|\Z)", text, flags=re.S | re.I)

        if not m: return text

        header, body = m.group(1), m.group(2)

        lines = [ln for ln in body.splitlines() if ln.strip()]

        lo, hi = lines_range

        if len(lines) >= lo: return text

        sys = (f"아래 '끝나는 기도문' 본문을 {lo}–{hi}줄의 예배 마무리 기도문으로 자연스럽게 보강하세요. 톤은 온화하고 구체적이며 과한 상투어를 피하세요. 헤더는 생성하지 마세요.")

        user = f"[본문]\n{body}\n[/본문]"

        try:

            resp = self.client.responses.create(

                model=self._model,

                input=[

                    {"role": "system", "content": [{"type": "input_text", "text": sys}]},

                    {"role": "user", "content": [{"type": "input_text", "text": user}]},

                ],

                max_output_tokens=min(self._max_out, 600),

            )

            new_body = self._collect_text(resp).strip()

            if not new_body.strip() or "(No text output)" in new_body: return text

            return text[:m.start()] + header + new_body + text[m.end():]

        except Exception:

            return text

  

    # ----------------- 공통 유틸 -----------------

    def _collect_text(self, resp) -> str:

        out = getattr(resp, "output_text", "") or ""

        if out: return out.strip()

        collected: List[str] = []

        for item in getattr(resp, "output", []) or []:

            if getattr(item, "type", None) == "message":

                for p in getattr(item, "content", []) or []:

                    if getattr(p, "type", None) in {"text", "output_text"}:

                        collected.append(getattr(p, "text", "") or "")

        return ("\n".join(collected).strip()) or "(No text output from model)"

  

    def _remember(self, user_text: str, assistant_text: str) -> None:

        ut = redact_pii(user_text)

        at = redact_pii(assistant_text)

        self.memory.add("user", ut)

        self.memory.add("assistant", at)

        self._summarize_memory()

  

    def _summarize_memory(self) -> None:

        window = self.memory.window(12)

        text = "\n".join([f"{m.role}: {m.content}" for m in window])

  

        resp = self.client.responses.create(

            model=self._model,

            input=[

                {

                    "role": "system",

                    "content": [{"type": "input_text",

                                 "text": "Summarize the conversation succinctly for future context. Korean user; keep details that matter for tasks."}],

                },

                {"role": "user", "content": [{"type": "input_text", "text": text}]},

            ],

            max_output_tokens=min(self._mem_sum_tokens, 1000),

        )

        summary = self._collect_text(resp)

        if summary and summary != "(No text output from model)":

            self.memory.summary = summary

agent/config.py

# agent/config.py

from pydantic import BaseModel

import os

from dotenv import load_dotenv

  

load_dotenv()  # .env 로드

  

def _parse_csv_env(name: str, default_list: list[str]) -> list[str]:

    raw = os.getenv(name, "")

    if not raw:

        return default_list

    # 쉼표 분리 + 공백 제거 + 빈 항목 제거

    return [x.strip() for x in raw.split(",") if x.strip()]

  

class Settings(BaseModel):

    # OpenAI

    openai_api_key: str = os.getenv("OPENAI_API_KEY", "")

    model: str = os.getenv("OPENAI_MODEL", "gpt-5")

    reasoning_effort: str = os.getenv("REASONING_EFFORT", "medium")

  

    # 출력 토큰/메모리 요약

    max_output_tokens: int = int(os.getenv("MAX_OUTPUT_TOKENS", "3200"))

    memory_summary_tokens: int = int(os.getenv("MEMORY_SUMMARY_TOKENS", "800"))

  

    # 사용 도구 허용 목록 (기본: STT/Transcript 둘 다)

    tool_allowlist: list[str] = _parse_csv_env(

        "TOOL_ALLOWLIST",

        ["youtube_stt", "youtube_transcript"]

    )

  

    # YouTube STT 옵션

    youtube_chunk_duration_sec: int = int(os.getenv("YT_CHUNK_SEC", "600"))

    youtube_overlap_sec: int = int(os.getenv("YT_OVERLAP_SEC", "1"))

  

    # 이메일(선택)

    recipient_email: str = os.getenv("RECIPIENT_EMAIL", "")  # 수신자 이메일

    smtp_host: str = os.getenv("SMTP_HOST", "")              # e.g., smtp.gmail.com

    smtp_port: int = int(os.getenv("SMTP_PORT", "587"))      # TLS 587 / SSL 465

    smtp_user: str = os.getenv("SMTP_USER", "")              # 로그인(보통 이메일)

    smtp_pass: str = os.getenv("SMTP_PASS", "")              # 앱 비밀번호/SMTP 비번

    from_email: str = os.getenv("FROM_EMAIL", "")            # 발신 표시(미설정 시 smtp_user 사용)

  

settings = Settings()

agent/memory.py

from __future__ import annotations

from typing import List, Dict

from dataclasses import dataclass, field

  

@dataclass

class Message:

    role: str  # 'user' | 'assistant' | 'tool'

    content: str

  

@dataclass

class MemoryStore:

    # Very simple in-memory convo store + rolling summary

    messages: List[Message] = field(default_factory=list)

    summary: str = ""

  

    def add(self, role: str, content: str) -> None:

        self.messages.append(Message(role=role, content=content))

  

    def window(self, limit: int = 10) -> List[Message]:

        return self.messages[-limit:]

  

    def to_responses_input(self, summary_first: bool = True) -> List[Dict]:

        items: List[Dict] = []

        if summary_first and self.summary:

            # f-string 제거: 줄바꿈 포함 안전하게 연결

            items.append({"role": "system", "content": "[Conversation summary]\n" + self.summary})

        for m in self.window(20):

            items.append({"role": m.role, "content": m.content})

        return items

agent/safety.py

import re

from typing import List

from .config import settings

  

PII_PATTERNS = [

    re.compile(r"""\b\d{3}-\d{2}-\d{4}\b"""),  # SSN-like

    re.compile(r"""\b\d{16}\b"""),  # naive card

]

  

def redact_pii(text: str) -> str:

    redacted = text

    for pat in PII_PATTERNS:

        redacted = pat.sub("[REDACTED]", redacted)

    return redacted

  

def is_tool_allowed(tool_name: str) -> bool:

    return tool_name in settings.tool_allowlist

agent/tool_router.py

# agent/tool_router.py

from __future__ import annotations

  

from typing import List, Tuple, Dict, Any, Callable, Optional

import importlib

import traceback

  

from .config import settings  # allowlist를 적용하기 위해 필요

  

# 각 툴은 (spec_dict, run_callable) 형태로 등록

# run_callable 서명: (args: Dict[str, Any]) -> Dict[str, Any]

_REGISTRY: List[Tuple[Dict[str, Any], Callable[[Dict[str, Any]], Dict[str, Any]]]] = []

  
  

def _safe(spec: Dict[str, Any], fn: Callable[[Dict[str, Any]], Dict[str, Any]]) -> None:

    """

    spec 검증 및 등록.

    - 필수 키: name

    - allowlist에 포함된 툴만 등록 (allowlist가 비어 있으면 모두 허용)

    """

    name = spec.get("name")

    if not name or not isinstance(name, str):

        raise AssertionError("Tool spec must include a string 'name' field.")

  

    allow = settings.tool_allowlist or []

    if allow and name not in allow:

        # 허용 목록이 설정되어 있고 이 이름이 없으면 스킵

        return

  

    _REGISTRY.append((spec, fn))

  
  

def _load_tool(module_path: str, expected_name: Optional[str] = None) -> None:

    """

    모듈을 import하고 tool_spec()/run를 가져와 등록.

    expected_name이 주어지면 spec["name"]과 일치 검증.

    """

    mod = importlib.import_module(module_path)

  

    if not hasattr(mod, "tool_spec") or not callable(getattr(mod, "tool_spec")):

        raise AssertionError(f"{module_path} must define a callable tool_spec().")

  

    if not hasattr(mod, "run") or not callable(getattr(mod, "run")):

        raise AssertionError(f"{module_path} must define a callable run(args: dict).")

  

    spec = mod.tool_spec()

    if expected_name and spec.get("name") != expected_name:

        raise AssertionError(f"Tool name mismatch: {expected_name} vs {spec.get('name')}")

  

    _safe(spec, mod.run)

  
  

def register() -> None:

    """

    사용 도구만 로딩. allowlist가 비어있다면 모두 등록 시도, 있으면 그 이름만 등록 시도.

    """

    _REGISTRY.clear()

  

    # 우리가 지원하는 툴 목록(모듈 경로, 기대 이름)

    catalog: List[Tuple[str, str]] = [

        ("agent.tools.youtube_transcript", "youtube_transcript"),

        ("agent.tools.youtube_stt",        "youtube_stt"),

    ]

  

    allow = settings.tool_allowlist or []

    for module_path, expected in catalog:

        if allow and expected not in allow:

            # 허용 목록이 있다면, 해당 이름만 시도

            continue

        try:

            _load_tool(module_path, expected_name=expected)

        except Exception:

            # 개별 툴 실패는 전체를 막지 않도록 삼킵니다.

            # 필요하면 로깅 시스템에 traceback 기록

            # print(traceback.format_exc())  # 개발 중엔 활성화

            pass

  
  

def tool_specs() -> List[Dict[str, Any]]:

    if not _REGISTRY:

        register()

    return [spec for (spec, _) in _REGISTRY]

  
  

def _find_tool(name: str) -> Optional[Tuple[Dict[str, Any], Callable[[Dict[str, Any]], Dict[str, Any]]]]:

    if not _REGISTRY:

        register()

    for (spec, fn) in _REGISTRY:

        if spec.get("name") == name:

            return spec, fn

    return None

  
  

def run_tool(name: str, args: Dict[str, Any]) -> Dict[str, Any]:

    """

    일관 포맷으로 결과 반환:

    - 성공: {"ok": True, "result": <tool_result>, "error": None}

    - 실패: {"ok": False, "result": None, "error": "<메시지>"}

    각 툴의 run도 위 형식을 권장하지만, 여기서도 방어적으로 감쌉니다.

    """

    try:

        found = _find_tool(name)

        if not found:

            return {"ok": False, "result": None, "error": f"Tool not found or not allowed: {name}"}

  

        spec, fn = found

        # allowlist가 런타임 중 변경될 수도 있으니 마지막 방어선

        allow = settings.tool_allowlist or []

        if allow and spec.get("name") not in allow:

            return {"ok": False, "result": None, "error": f"Tool not allowed by settings: {name}"}

  

        # 툴 실행

        raw = fn(args)

  

        # 툴의 반환이 이미 표준 포맷이면 그대로

        if isinstance(raw, dict) and "ok" in raw and ("result" in raw or "error" in raw):

            return raw

  

        # 아니면 감싸서 표준화

        return {"ok": True, "result": raw, "error": None}

  

    except Exception as e:

        # 예외를 안전하게 메시지로 변환

        msg = f"{type(e).__name__}: {e}"

        # print(traceback.format_exc())  # 필요시 로깅

        return {"ok": False, "result": None, "error": msg}

agent/tools/youtube_transcript.py

# agent/tools/youtube_transcript.py

from __future__ import annotations

from typing import Dict, Any, List, Optional

import re

from urllib.parse import urlparse, parse_qs

  

# NEW: 안전한 임포트(패키지 미설치시 깔끔히 에러 반환)

try:

    from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound

    _YT_AVAILABLE = True

except Exception:

    YouTubeTranscriptApi = None  # type: ignore

    TranscriptsDisabled = NoTranscriptFound = Exception  # type: ignore

    _YT_AVAILABLE = False

  

_YT_ID_RE = re.compile(r"^[A-Za-z0-9_-]{11}$")

  

def _extract_video_id(url: str) -> Optional[str]:

    # ... (사용하신 robust extractor 그대로) ...

    u = urlparse(url)

    host = (u.netloc or "").lower()

    path = u.path or ""

    qs = parse_qs(u.query or "")

  

    vvals = qs.get("v")

    if vvals:

        vid = vvals[0]

        if _YT_ID_RE.match(vid):

            return vid

  

    if host.endswith("youtu.be"):

        seg = path.strip("/").split("/")

        if seg and _YT_ID_RE.match(seg[0]):

            return seg[0]

  

    segs = [p for p in path.split("/") if p]

    if len(segs) >= 2 and segs[0] in {"live", "embed", "shorts"}:

        cand = segs[1]

        if _YT_ID_RE.match(cand):

            return cand

  

    if segs and _YT_ID_RE.match(segs[-1]):

        return segs[-1]

  

    m = re.search(r"(?:v=|/live/|/embed/|/shorts/|youtu\.be/)([A-Za-z0-9_-]{11})", url)

    if m:

        return m.group(1)

    return None

  

def _flatten_transcript(items: List[Dict[str, Any]]) -> str:

    parts, buf = [], []

    for it in items:

        t = (it.get("text") or "").strip()

        if not t:

            continue

        if t.startswith("[") and t.endswith("]"):

            continue

        buf.append(t)

        if len(" ".join(buf)) > 800:

            parts.append(" ".join(buf)); buf = []

    if buf:

        parts.append(" ".join(buf))

    return "\n".join(parts).strip()

  

def tool_spec() -> Dict[str, Any]:

    return {

        "name": "youtube_transcript",

        "description": "Fetch transcript text from a YouTube URL (ko/en preferred). Returns plain text.",

        "input_schema": {

            "type": "object",

            "properties": {

                "url": {"type": "string", "description": "YouTube video URL"},

                "prefer_langs": {

                    "type": "array", "items": {"type": "string"},

                    "description": "Preferred languages, in order. e.g., ['ko', 'en']",

                    "default": ["ko", "en"],

                },

                "allow_translate": {

                    "type": "boolean",

                    "description": "If no direct transcript in preferred langs, allow YouTube translation.",

                    "default": True,

                },

            },

            "required": ["url"],

            "additionalProperties": False,

        },

    }

  

def run(args: Dict[str, Any]) -> Dict[str, Any]:

    if not _YT_AVAILABLE:

        return {"ok": False, "error": "youtube-transcript-api not installed."}

  

    url = args.get("url", "")

    prefer_langs = args.get("prefer_langs") or ["ko", "en"]

    allow_translate = bool(args.get("allow_translate", True))

  

    vid = _extract_video_id(url)

    if not vid:

        return {"ok": False, "error": "Invalid YouTube URL (cannot extract video id)."}

  

    try:

        # 최신버전 경로: list_transcripts 우선

        try:

            tl = YouTubeTranscriptApi.list_transcripts(vid)

  

            # 1) 선호 언어 직접 자막

            for lang in prefer_langs:

                try:

                    tr = tl.find_transcript([lang])

                    text = _flatten_transcript(tr.fetch())

                    if text:

                        return {"ok": True, "result": {"video_id": vid, "lang": lang, "text": text, "source": "direct"}}

                except Exception:

                    pass

  

            # 2) 번역 허용 시

            if allow_translate:

                for lang in prefer_langs:

                    for candidate in tl:

                        try:

                            text = _flatten_transcript(candidate.translate(lang).fetch())

                            if text:

                                return {"ok": True, "result": {"video_id": vid, "lang": lang, "text": text, "source": "translated"}}

                        except Exception:

                            continue

  

        except AttributeError:

            # 구버전 폴백: get_transcript

            for lang in prefer_langs:

                try:

                    items = YouTubeTranscriptApi.get_transcript(vid, languages=[lang])

                    text = _flatten_transcript(items)

                    if text:

                        return {"ok": True, "result": {"video_id": vid, "lang": lang, "text": text, "source": "direct_legacy"}}

                except Exception:

                    pass

  

        return {"ok": False, "error": "No transcript available in preferred langs (and translation failed)."}

  

    except TranscriptsDisabled:

        return {"ok": False, "error": "Transcripts disabled for this video."}

    except NoTranscriptFound:

        return {"ok": False, "error": "No transcript found for this video."}

    except Exception as e:

        return {"ok": False, "error": f"Unexpected error: {e}"}

agent/tools/youtube_stt.py

# agent/tools/youtube_stt.py

from __future__ import annotations

from typing import Dict, Any, List, Optional, Tuple

import os

import re

import shutil

import tempfile

import subprocess

  

import openai

  

# -----------------------------

# Utilities

# -----------------------------

def _which(cmd: str) -> Optional[str]:

    return shutil.which(cmd)

  

def _get_ffmpeg_paths() -> Tuple[str, str]:

    ffmpeg = os.getenv("FFMPEG_PATH") or _which("ffmpeg")

    ffprobe = os.getenv("FFPROBE_PATH") or _which("ffprobe")

    if not ffmpeg or not ffprobe:

        raise RuntimeError("ffmpeg/ffprobe not found. Set FFMPEG_PATH/FFPROBE_PATH or add to PATH.")

    return ffmpeg, ffprobe

  

def _run(cmd: List[str]) -> subprocess.CompletedProcess:

    return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)

  

def _probe_duration(ffprobe: str, media_path: str) -> float:

    cmd = [

        ffprobe, "-v", "error",

        "-show_entries", "format=duration",

        "-of", "default=noprint_wrappers=1:nokey=1",

        media_path,

    ]

    out = _run(cmd).stdout.decode("utf-8", "ignore").strip()

    try:

        return float(out)

    except Exception:

        return 0.0

  

def _hhmmss_to_seconds(s: Optional[str]) -> Optional[int]:

    if not s:

        return None

    try:

        h, m, sec = s.strip().split(":")

        return int(h) * 3600 + int(m) * 60 + int(sec)

    except Exception:

        return None

  

def _seconds_to_hhmmss(x: int) -> str:

    h = x // 3600

    m = (x % 3600) // 60

    s = x % 60

    return f"{h:02d}:{m:02d}:{s:02d}"

  

def _yt_download_audio(url: str, outdir: str) -> str:

    """

    Download best-available audio using yt-dlp with SABR 대응 옵션.

    Returns local file path.

    """

    ytdlp = _which("yt-dlp") or _which("youtube-dl")

    if not ytdlp:

        raise RuntimeError("yt-dlp (or youtube-dl) is not installed or not in PATH.")

  

    outtpl = os.path.join(outdir, "%(id)s.%(ext)s")

    cmd = [

        ytdlp,

        "-N", "8",

        "--no-check-formats",

        "--hls-prefer-ffmpeg",

        "-f", "bestaudio/best",

        "--no-playlist",

        "-o", outtpl,

        url,

    ]

    _run(cmd)

  

    cand = []

    for fn in os.listdir(outdir):

        p = os.path.join(outdir, fn)

        if os.path.isfile(p) and any(fn.lower().endswith(ext) for ext in (".webm", ".m4a", ".mp4", ".mp3")):

            cand.append((os.path.getsize(p), p))

    if not cand:

        raise RuntimeError("Audio download failed: no audio file found.")

    cand.sort(reverse=True)

    return cand[0][1]

  

def _ensure_wav(ffmpeg: str, in_path: str, out_path: str) -> None:

    """

    Normalize to 16kHz mono PCM WAV to keep STT chunks small.

    """

    cmd = [

        ffmpeg, "-y", "-i", in_path,

        "-ac", "1", "-ar", "16000",

        "-vn", "-c:a", "pcm_s16le",

        out_path,

    ]

    _run(cmd)

  

def _slice_wav(ffmpeg: str, wav_path: str, start: int, end: int, out_path: str) -> None:

    dur = max(1, end - start)

    cmd = [

        ffmpeg, "-y",

        "-ss", str(start),

        "-t", str(dur),

        "-i", wav_path,

        "-ac", "1", "-ar", "16000",

        "-vn", "-c:a", "pcm_s16le",

        out_path,

    ]

    _run(cmd)

  

def _chunk_offsets(total: int, chunk_sec: int, overlap_sec: int = 0) -> List[Tuple[int, int]]:

    if chunk_sec <= 1:

        return [(0, total)]

    offs = []

    step = max(1, chunk_sec - max(0, overlap_sec))

    cur = 0

    while cur < total:

        end = min(total, cur + chunk_sec)

        offs.append((cur, end))

        if end >= total:

            break

        cur += step

    return offs

  

# -----------------------------

# OpenAI STT

# -----------------------------

def _openai_client() -> openai.OpenAI:

    api_key = os.getenv("OPENAI_API_KEY", "")

    if not api_key:

        raise RuntimeError("OPENAI_API_KEY is not set.")

    return openai.OpenAI(api_key=api_key)

  

def _transcribe_file(client: openai.OpenAI, file_path: str, model: str, language: Optional[str]) -> str:

    with open(file_path, "rb") as f:

        resp = client.audio.transcriptions.create(

            model=model,

            file=f,

            language=language or None,

        )

    return getattr(resp, "text", "") or ""

  

# -----------------------------

# Public Tool API

# -----------------------------

def tool_spec() -> Dict[str, Any]:

    return {

        "name": "youtube_stt",

        "description": "Download YouTube audio and transcribe only the requested sermon segment.",

        "input_schema": {

            "type": "object",

            "properties": {

                "url": {"type": "string", "description": "YouTube video URL"},

                "lang_hint": {"type": "string", "description": "Language hint (e.g., 'ko').", "default": "ko"},

                "sermon_start": {"type": "string", "description": "Start time HH:MM:SS (optional, default 00:00:00)"},

                "sermon_end": {"type": "string", "description": "End time HH:MM:SS (optional, default video end)"},

                "chunk_duration_sec": {"type": "integer", "description": "Chunk seconds for upload.", "default": 600},

                "overlap_sec": {"type": "integer", "description": "Overlap seconds between chunks.", "default": 1},

            },

            "required": ["url"],

            "additionalProperties": False,

        },

    }

  

def run(args: Dict[str, Any]) -> Dict[str, Any]:

    """

    자동탐지 없음.

      - sermon_start: 없으면 0초

      - sermon_end  : 없으면 영상(정규화 WAV) 끝까지

    지정 구간만 추출해 청크 분할 후 STT하여 결합.

    """

    url = args.get("url", "")

    if not url:

        return {"ok": False, "error": "Missing 'url'."}

  

    lang_hint = (args.get("lang_hint") or "ko").strip()

    chunk_sec = int(args.get("chunk_duration_sec") or 600)

    overlap_sec = int(args.get("overlap_sec") or 1)

    transcribe_model = os.getenv("OPENAI_TRANSCRIBE_MODEL", "gpt-4o-mini-transcribe")

  

    try:

        ffmpeg, ffprobe = _get_ffmpeg_paths()

    except Exception as e:

        return {"ok": False, "error": str(e)}

  

    try:

        client = _openai_client()

    except Exception as e:

        return {"ok": False, "error": f"OpenAI client error: {e}"}

  

    work = tempfile.mkdtemp(prefix="ytstt_")

    try:

        # 1) 오디오 다운로드

        audio_path = _yt_download_audio(url, work)

  

        # 2) WAV 정규화

        norm_wav = os.path.join(work, "audio_16k.wav")

        _ensure_wav(ffmpeg, audio_path, norm_wav)

  

        total_sec = int(round(_probe_duration(ffprobe, norm_wav)))

  

        # 3) 시간 해석 (시작 없으면 0, 끝 없으면 total)

        start_s = _hhmmss_to_seconds(args.get("sermon_start"))

        end_s = _hhmmss_to_seconds(args.get("sermon_end"))

        used_start = max(0, start_s or 0)

        used_end = min(total_sec, end_s if end_s is not None else total_sec)

        if used_end <= used_start + 2:

            return {"ok": False, "error": "Invalid time window. Please check start/end times."}

  

        # 4) 지정 구간 추출

        sliced_path = os.path.join(work, "sermon_window.wav")

        _slice_wav(ffmpeg, norm_wav, used_start, used_end, sliced_path)

  

        win_len = int(round(_probe_duration(ffprobe, sliced_path)))

        if win_len <= 1:

            return {"ok": False, "error": "Chosen window is too short to transcribe."}

  

        # 5) 청크 분할 + STT

        offs = _chunk_offsets(win_len, chunk_sec, overlap_sec=max(0, overlap_sec))

        out_texts: List[str] = []

        segments_meta: List[Dict[str, Any]] = []

  

        for i, (a, b) in enumerate(offs, start=1):

            chunk_path = os.path.join(work, f"chunk_{i:03d}.wav")

            _slice_wav(ffmpeg, sliced_path, a, b, chunk_path)

  

            try:

                text = _transcribe_file(client, chunk_path, transcribe_model, lang_hint or None)

            except Exception as e:

                return {"ok": False, "error": f"stt failed on chunk {i}: {e}"}

  

            abs_start = used_start + a

            abs_end = used_start + b

            tag = f"[{_seconds_to_hhmmss(abs_start)}–{_seconds_to_hhmmss(abs_end)}]"

            out_texts.append(f"{tag}\n{text.strip()}\n")

            segments_meta.append({

                "index": i,

                "local_start": a,

                "local_end": b,

                "abs_start": _seconds_to_hhmmss(abs_start),

                "abs_end": _seconds_to_hhmmss(abs_end),

                "chars": len(text or ""),

            })

  

        final_text = ("\n".join(out_texts)).strip()

        if not final_text:

            return {"ok": False, "error": "Empty transcription result."}

  

        return {

            "ok": True,

            "result": {

                "text": final_text,

                "used_range": {

                    "start_hhmmss": _seconds_to_hhmmss(used_start),

                    "end_hhmmss": _seconds_to_hhmmss(used_end),

                },

                "segments": segments_meta,

            },

        }

  

    except subprocess.CalledProcessError as e:

        err = (e.stderr or b"").decode("utf-8", "ignore")

        return {"ok": False, "error": f"Command failed: {err.strip() or e}"}

    except Exception as e:

        return {"ok": False, "error": f"stt failed: {e}"}

    finally:

        try:

            shutil.rmtree(work, ignore_errors=True)

        except Exception:

            pass